Spark SQL 源码分析
入口
1 | SparkSession -> |
Parser
parsePlan方法会返回一个LogicalPlan对象;
第一步,利用 antlr4 生成的 SqlBaseLexer【val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))】 对SQL进行词法分析,生成一个CommonTokenStream 对象【val tokenStream = new CommonTokenStream(lexer)】
第二步,利用 antlr4 生成的 SqlBaseParser 【val parser = new SqlBaseParser(tokenStream)】对SQL进行语法分析,得到 Unresolved LogicalPlan
以下均在 QueryExecution 中执行
Analyzer
Analyzer 持有一个 SessionCatalog 对象的引用
Analyzer 继承自 RuleExecutor[LogicalPlan],因此可以对 LogicalPlan 进行转换
1 | lazy val analyzed: LogicalPlan = { |
通过 Catalog 确定每张表对应的字段集、字段类型、数据存储位置,生成Resolved Logical Plan
1 | def checkAnalysis(plan: LogicalPlan): Unit = { |
ResolveRelations
1 | // 关联表 |
Optimizer
1 | lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData) |
逻辑优化器,会进行谓词下推,列值裁剪,常量折叠,谓词合并等等一系列逻辑优化
根据预先定义好的规则对 Resolved Logical Plan 进行优化并生成 Optimized Logical Plan
SparkPlanner
1 | lazy val sparkPlan: SparkPlan = { |
把 Logical Plan 转变为 Physical Plan
执行 Physical Plan
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
转成RDD
1 | /** Internal version of the RDD. Avoids copies and has no schema */ |
相关文章